Crate object_store
source ·Expand description
§object_store
This crate provides a uniform API for interacting with object
storage services and local files via the ObjectStore
trait.
Using this crate, the same binary and code can run in multiple clouds and local test environments, via a simple runtime configuration change.
§Highlights
-
A high-performance async API focused on providing a consistent interface mirroring that of object stores such as S3
-
Production quality, leading this crate to be used in large scale production systems, such as crates.io and [InfluxDB IOx]
-
Support for advanced functionality, including atomic, conditional reads and writes, vectored IO, bulk deletion, and more…
-
Stable and predictable governance via the Apache Arrow project
-
Small dependency footprint, depending on only a small number of common crates
Originally developed by InfluxData and subsequently donated to Apache Arrow.
§Available ObjectStore
Implementations
By default, this crate provides the following implementations:
- Memory:
InMemory
- Local filesystem:
LocalFileSystem
Feature flags are used to enable support for other implementations:
gcp
: Google Cloud Storage support. SeeGoogleCloudStorageBuilder
aws
: Amazon S3. SeeAmazonS3Builder
azure
: Azure Blob Storage. SeeMicrosoftAzureBuilder
http
: HTTP/WebDAV Storage. SeeHttpBuilder
§Why not a Filesystem Interface?
Whilst this crate does provide a BufReader
, the ObjectStore
interface mirrors the APIs
of object stores and not filesystems, opting to provide stateless APIs instead of the cursor
based interfaces such as Read
or Seek
favoured by filesystems.
This provides some compelling advantages:
- All operations are atomic, and readers cannot observe partial and/or failed writes
- Methods map directly to object store APIs, providing both efficiency and predictability
- Abstracts away filesystem and operating system specific quirks, ensuring portability
- Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads
§Adapters
ObjectStore
instances can be composed with various adapters
which add additional functionality:
- Rate Throttling:
ThrottleConfig
- Concurrent Request Limit:
LimitStore
§Configuration System
This crate provides a configuration system inspired by the APIs exposed by fsspec,
PyArrow FileSystem, and Hadoop FileSystem, allowing creating a DynObjectStore
from a URL and an optional list of key value pairs. This provides a flexible interface
to support a wide variety of user-defined store configurations, with minimal additional
application complexity.
// Can manually create a specific store variant using the appropriate builder
let store: AmazonS3 = AmazonS3Builder::from_env()
.with_bucket_name("my-bucket").build().unwrap();
// Alternatively can create an ObjectStore from an S3 URL
let url = Url::parse("s3://bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");
// Potentially with additional options
let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
// Or with URLs that encode the bucket name in the URL path
let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");
§List objects
Use the ObjectStore::list
method to iterate over objects in
remote storage or files in the local filesystem:
// create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();
// Recursively list all files below the 'data' path.
// 1. On AWS S3 this would be the 'data/' prefix
// 2. On a local filesystem, this would be the 'data' directory
let prefix = Path::from("data");
// Get an `async` stream of Metadata objects:
let mut list_stream = object_store.list(Some(&prefix));
// Print a line about each object
while let Some(meta) = list_stream.next().await.transpose().unwrap() {
println!("Name: {}, size: {}", meta.location, meta.size);
}
Which will print out something like the following:
Name: data/file01.parquet, size: 112832
Name: data/file02.parquet, size: 143119
Name: data/child/file03.parquet, size: 100
...
§Fetch objects
Use the ObjectStore::get
method to fetch the data bytes
from remote storage or files in the local filesystem as a stream.
// Create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();
// Retrieve a specific file
let path = Path::from("data/file01.parquet");
// Fetch just the file metadata
let meta = object_store.head(&path).await.unwrap();
println!("{meta:?}");
// Fetch the object including metadata
let result: GetResult = object_store.get(&path).await.unwrap();
assert_eq!(result.meta, meta);
// Buffer the entire object in memory
let object: Bytes = result.bytes().await.unwrap();
assert_eq!(object.len(), meta.size);
// Alternatively stream the bytes from object storage
let stream = object_store.get(&path).await.unwrap().into_stream();
// Count the '0's using `try_fold` from `TryStreamExt` trait
let num_zeros = stream
.try_fold(0, |acc, bytes| async move {
Ok(acc + bytes.iter().filter(|b| **b == 0).count())
}).await.unwrap();
println!("Num zeros in {} is {}", path, num_zeros);
§Put Object
Use the ObjectStore::put
method to atomically write data.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/file1");
let bytes = Bytes::from_static(b"hello");
object_store.put(&path, bytes).await.unwrap();
§Multipart Upload
Use the ObjectStore::put_multipart
method to atomically write a large amount of data,
with implementations automatically handling parallel, chunked upload where appropriate.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
let bytes = Bytes::from_static(b"hello");
writer.write_all(&bytes).await.unwrap();
writer.flush().await.unwrap();
writer.shutdown().await.unwrap();
§Vectored Read
A common pattern, especially when reading structured datasets, is to need to fetch multiple, potentially non-contiguous, ranges of a particular object.
ObjectStore::get_ranges
provides an efficient way to perform such vectored IO, and will
automatically coalesce adjacent ranges into an appropriate number of parallel requests.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0].len(), 10);
§Conditional Fetch
More complex object retrieval can be supported by ObjectStore::get_opts
.
For example, efficiently refreshing a cache without re-fetching the entire object data if the object hasn’t been modified.
struct CacheEntry {
/// Data returned by last request
data: Bytes,
/// ETag identifying the object returned by the server
e_tag: String,
/// Instant of last refresh
refreshed_at: Instant,
}
/// Example cache that checks entries after 10 seconds for a new version
struct Cache {
entries: HashMap<Path, CacheEntry>,
store: Arc<dyn ObjectStore>,
}
impl Cache {
pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
Ok(match self.entries.get_mut(path) {
Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
true => e.data.clone(), // Return cached data
false => { // Check if remote version has changed
let opts = GetOptions {
if_none_match: Some(e.e_tag.clone()),
..GetOptions::default()
};
match self.store.get_opts(&path, opts).await {
Ok(d) => e.data = d.bytes().await?,
Err(Error::NotModified { .. }) => {} // Data has not changed
Err(e) => return Err(e),
};
e.refreshed_at = Instant::now();
e.data.clone()
}
},
None => { // Not cached, fetch data
let get = self.store.get(&path).await?;
let e_tag = get.meta.e_tag.clone();
let data = get.bytes().await?;
if let Some(e_tag) = e_tag {
let entry = CacheEntry {
e_tag,
data: data.clone(),
refreshed_at: Instant::now(),
};
self.entries.insert(path.clone(), entry);
}
data
}
})
}
}
§Conditional Put
The default behaviour when writing data is to upsert any existing object at the given path,
overwriting any previous value. More complex behaviours can be achieved using PutMode
, and
can be used to build Optimistic Concurrency Control based transactions. This facilitates
building metadata catalogs, such as Apache Iceberg or Delta Lake, directly on top of object
storage, without relying on a separate DBMS.
let store = get_object_store();
let path = Path::from("test");
// Perform a conditional update on path
loop {
// Perform get request
let r = store.get(&path).await.unwrap();
// Save version information fetched
let version = UpdateVersion {
e_tag: r.meta.e_tag.clone(),
version: r.meta.version.clone(),
};
// Compute new version of object contents
let new = do_update(r.bytes().await.unwrap());
// Attempt to commit transaction
match store.put_opts(&path, new, PutMode::Update(version).into()).await {
Ok(_) => break, // Successfully committed
Err(Error::Precondition { .. }) => continue, // Object has changed, try again
Err(e) => panic!("{e}")
}
}
§TLS Certificates
Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their CA
certificates. By default the system-bundled certificates are used (see
rustls-native-certs
). The tls-webpki-roots
feature switch can be used to also bundle Mozilla’s
root certificates with the library/application (see webpki-roots
).
Modules§
- An object store implementation for S3
- An object store implementation for Azure blob storage
- Utilities for performing tokio-style buffered IO
- A
ChunkedStore
that can be used to test streaming behaviour - Utility for streaming newline delimited files from object storage
- An object store implementation for Google Cloud Storage
- An object store implementation for generic HTTP servers
- An object store that limits the maximum concurrency of the wrapped implementation
- An object store implementation for a local filesystem
- An in-memory object store implementation
- Cloud Multipart Upload
- Path abstraction for Object Storage
- An object store wrapper handling a constant path prefix
- Abstraction of signed URL generation for those object store implementations that support it
- A throttling object store wrapper
Structs§
- Exponential backoff with jitter
- HTTP client configuration for remote object stores
- Options for a get request, such as range
- Result for a get request
- Result of a list call that includes objects, prefixes (directories) and a token for the next set of results. Individual result sets may be limited to 1,000 objects based on the underlying object storage’s limitations.
- The metadata that describes an object.
- Options for a put request
- Result for a put request
- The configuration for how to respond to request errors
- A static set of credentials
- A collection of key value pairs used to annotate objects
- Uniquely identifies a version of an object to update
Enums§
- Configuration keys for
ClientOptions
- A specialized
Error
for object store-related errors - Request only a portion of an object’s bytes
- The kind of a
GetResult
- Configure preconditions for the put operation
Constants§
- Range requests with a gap less than or equal to this, will be coalesced into a single request by
coalesce_ranges
Traits§
- Provides credentials for use when signing requests
- Universal API to multiple object store services.
Functions§
- Takes a function
fetch
that can fetch a range of bytes and uses this to fetch the provided byteranges
- Collect a stream into
Bytes
avoiding copying in the event of a single chunk - Create an
ObjectStore
based on the providedurl
- Create an
ObjectStore
based on the providedurl
and options
Type Aliases§
- An alias for a dynamically dispatched object store implementation.
- Id type for multi-part uploads.
- A specialized
Result
for object store-related errors